package com.uxsino.simo.query;

import com.uxsino.reactorq.model.IndicatorValue;
import com.uxsino.reactorq.processor.QueueProcessor;
import com.uxsino.simo.incremental.IncrementalUtil;
import com.uxsino.simo.indicator.Indicator;
import com.uxsino.simo.indicator.NEIndicatorDepo;
import com.uxsino.simo.indicator.NSIndicatorDepo;
import com.uxsino.simo.indicator.expression.ExprEvaluator;
import com.uxsino.simo.networkentity.EntityInfo;
import com.uxsino.simo.task.TaskInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;

public class QueryContext {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    private NSIndicatorDepo globalDepo;

    private Querier querier;

    private ConcurrentMap<String, NEIndicatorDepo> localDepos = new ConcurrentHashMap<>();

    private TaskInfo task;

    private String sequenceId;

    public boolean traceCmd = false;

    // key: neId, value: set of IndicatorName
    private Map<String, Set<String>> failedIndicators = new HashMap<>();

    // a set of disconnected server
    private Set<String> disconnectedServers = new HashSet<String>();

    // context-wide variables shared by all threads
    private ConcurrentMap<String, Object> variables = new ConcurrentHashMap<>();

    public enum INDICATOR_QUERY_STATUS {
                                        WAITING,
                                        READY,
                                        FAILED
    };

    private EmitterProcessor<IndicatorValue> failureQueue = EmitterProcessor.create();

    private EmitterProcessor<IndicatorValue> instanceValueQueue = EmitterProcessor.create();

    /**
     * get the evaluator for this thread, binded with this context
     * @return
     */
    public ExprEvaluator getExprEvaluator(EntityInfo entity) {
        ExprEvaluator ev = ExprEvaluator.getEvaluator();
        ev.setDepo(getNEIndicatorDepo(entity.id));
        return ev;
    }

    public ExprEvaluator getExprEvaluator(String entityId) {
        ExprEvaluator ev = ExprEvaluator.getEvaluator();
        ev.setDepo(getNEIndicatorDepo(entityId));
        return ev;

    }

    public ExprEvaluator getExprEvaluator() {
        ExprEvaluator ev = ExprEvaluator.getEvaluator();
        return ev;
    }

    public Querier getQuerier() {
        return querier;
    }

    private class ValueCacheEntry {
        public String neId;

        public String IndicatorName;

        // how many values are expected
        public int sourceCount = 0;

        public int completedSourceCount = 0;

        public List<IndicatorValue> values = new CopyOnWriteArrayList<>();

        public BiFunction<QueryContext, List<IndicatorValue>, Object> postProcessAction;

        public ValueCacheEntry(String neId, String IndicatorName, int sourceCount) {
            this.neId = neId;
            this.IndicatorName = IndicatorName;
            this.sourceCount = sourceCount;
        }

        public void addValue(IndicatorValue indicatorValue, int sourceId) {
            values.add(indicatorValue);
            completedSourceCount++;

        }

        public boolean completed() {
            return completedSourceCount >= sourceCount;
        }
    }

    /**
     * set a context-wide variable, shared by all threads
     * @param variableName
     * @param value
     */
    public void setVariable(String variableName, Object value) {
        variables.put(variableName, value);
    }

    public Map<String, Object> getVariables() {
        return new HashMap<>(variables);
    }

    public Object getVariable(String variableName) {
        return variables.get(variableName);
    }

    /**
     * the parameters used for current query instance
     */
    private ThreadLocal<Map<String, String>> queryParameters = new ThreadLocal<Map<String, String>>();

    /**
     * set parameters used for current query instance
     * @param params
     */
    public void setQueryParameters(Map<String, String> params) {
        queryParameters.set(params);
    }

    /*
     * get parameters used for current query instance
     */
    public Map<String, String> getQueryParameters() {
        return queryParameters.get();
    }

    /*
     * get parameter value used for current query instance
     */
    public String getQueryParameter(String parameterName) {
        Map<String, String> params = queryParameters.get();
        return params != null ? params.get(parameterName) : null;
    }

    private ConcurrentHashMap<String, ValueCacheEntry> valueCache = new ConcurrentHashMap<>();

    /**
     * 
     * @param sequenceId
     * @param task
     */
    public QueryContext(NSIndicatorDepo globalDepo, Querier querier, String sequenceId, TaskInfo task) {
        this.globalDepo = globalDepo;
        this.querier = querier;
        this.sequenceId = sequenceId;
        this.task = task;
        // valueProcessor.connect();
    }

    public void setIndicatorValue(EntityInfo entity, Indicator ind, int queryId, Object obj, long costMs) {
        long timeMillis = System.currentTimeMillis();
        IndicatorValue indicatorValue = new IndicatorValue(entity.id, ind.name, obj);
        indicatorValue.queryTimeMillis = timeMillis;
        indicatorValue.sequenceId = sequenceId;
        indicatorValue.taskId = task.getTaskId();
        indicatorValue.cost = costMs;
        // indicatorValue.task = task;

        ValueCacheEntry valueCacheEntry = valueCache.get(cacheKey(entity.id, ind.name));
        if (valueCacheEntry == null) {
            getExprEvaluator(entity).evalExprFields(ind, obj);
            getNEIndicatorDepo(entity.id).setIndicatorValue(indicatorValue);
            sendValue(indicatorValue);
        } else {
            valueCacheEntry.addValue(indicatorValue, queryId);
            if (valueCacheEntry.completed()) {
                onCachedValueComplete(valueCacheEntry);
            }
        }
    }

    public NEIndicatorDepo getNEIndicatorDepo(String entityId) {
        NEIndicatorDepo depo = localDepos.get(entityId);
        if (depo == null) {
            depo = new NEIndicatorDepo(this.globalDepo.getDomain().getEntity(entityId), globalDepo);
            synchronized (localDepos) {
                localDepos.put(entityId, depo);
            }
        }
        return depo;
    }

    // report an error querying indicator, to notify any query that depend on it
    public void setIndicatorFail(String neId, String indicatorName) {
        Set<String> indSet = failedIndicators.get(neId);
        if (indSet == null) {
            indSet = new HashSet<>();
            failedIndicators.put(neId, indSet);
        } else if (indSet.contains(indicatorName)) {
            // already reported
            return;
        }
        indSet.add(indicatorName);
        IndicatorValue iv = new IndicatorValue();
        iv.entityId = neId;
        iv.indicatorName = indicatorName;
        iv.queryTimeMillis = System.currentTimeMillis();
        iv.sequenceId = this.sequenceId;
        // iv.task = this.task;
        iv.taskId = this.task.getTaskId();
        iv.value = null;
        sendValueFailure(iv);
    }

    // add disconnected server
    public void setDisconnectedServer(String neId) {
        disconnectedServers.add(neId);
    }

    // check the follow indicator run a disconnected server, if could help us to avoid run back query on it.
    public boolean isDisconnectedServer(String neId) {
        return disconnectedServers.contains(neId);
    }

    public TaskInfo getTask() {
        return task;
    }

    public String getSequenceId() {
        return sequenceId;
    }

    private synchronized void sendValueFailure(IndicatorValue failure) {
        // failureAction.accept(failure);
        failureQueue.onNext(failure);
    }

    /**
     * get a flux of indicator values of this task
     * @return
     */
    public Flux<IndicatorValue> getValueFlux() {
        // return mainValueProcessor.filter(v -> sequenceId.equals(v.sequenceId)).takeUntil(v -> v.isEnded());

        // use == to compare sequenceId instead of equals, it is a litter faster but dangerous
        // that means all IndicatorValue should refer to the member sequenceId
        // or it could cause miss match (missed value in the flux
        return mainValueProcessor.filter(v -> this.sequenceId == v.sequenceId).takeUntil(v -> v.isInternal());
    }

    public Disposable onIndicatorFailure(Consumer<IndicatorValue> action) {
        Disposable subscription = failureQueue.subscribe(action);
        return subscription;
    }

    public Disposable onValueSet(Consumer<IndicatorValue> action) {
        Disposable subscription = instanceValueQueue.subscribe(action);
        return subscription;
    }

    public void cacheValue(String neId, String indicatorName, int sourceCount,
        BiFunction<QueryContext, List<IndicatorValue>, Object> postProcessAction) {

        String key = cacheKey(neId, indicatorName);

        if (valueCache.containsKey(key)) {
            return;
        }
        ValueCacheEntry entry = new ValueCacheEntry(neId, indicatorName, sourceCount);
        entry.postProcessAction = postProcessAction;
        valueCache.put(key, entry);
    }

    private void onCachedValueComplete(ValueCacheEntry entry) {
        IndicatorValue indicatorValue = new IndicatorValue();
        indicatorValue.entityId = entry.neId;
        indicatorValue.indicatorName = entry.IndicatorName;
        indicatorValue.queryTimeMillis = System.currentTimeMillis();
        indicatorValue.sequenceId = sequenceId;
        indicatorValue.taskId = task.getTaskId();
        // indicatorValue.task = task;
        indicatorValue.value = entry.postProcessAction.apply(this, entry.values);
        Indicator ind = globalDepo.getIndicatorNamespace().getIndicator(entry.IndicatorName);
        getExprEvaluator(entry.neId).evalExprFields(ind, indicatorValue.value);
        getNEIndicatorDepo(indicatorValue.entityId).setIndicatorValue(indicatorValue);
        sendValue(indicatorValue);

    }

    /**
     * Do complete actions on the task.
     * flush cached values
     * send failed indicators, if any
     * send status message
     *
     */
    public void onComplete(IndicatorValue.STATUS status) {
        for (ValueCacheEntry valueCacheMapEntry : valueCache.values()) {
            if (!valueCacheMapEntry.completed()) {
                // force a completion even not all the query returned
                onCachedValueComplete(valueCacheMapEntry);
            }
        }

        if (failedIndicators.size() > 0) {
            sendFailedIndicators();
        }
        if (disconnectedServers.size() > 0) {
            sendDisconnectedIndicators();
        }

        IndicatorValue ivTaskStatus = new IndicatorValue();

        ivTaskStatus.entityId = IndicatorValue.FLOW_CONTROL;
        ivTaskStatus.type = IndicatorValue.TYPE.TaskStatus;
        ivTaskStatus.sequenceId = sequenceId;
        // ivTaskStatus.task = task;
        ivTaskStatus.taskId = task.getTaskId();
        ivTaskStatus.queryTimeMillis = System.currentTimeMillis();
        ivTaskStatus.value = status;

        sendValue(ivTaskStatus);
    }

    public void dispose() {
    }

    /**
     * send a list of ne/indicator to the resule queue
     */
    private void sendFailedIndicators() {
        IndicatorValue losers = new IndicatorValue();
        losers.entityId = IndicatorValue.FLOW_CONTROL;
        losers.type = IndicatorValue.TYPE.FailedMsg;
        losers.queryTimeMillis = System.currentTimeMillis();
        losers.sequenceId = sequenceId;
        // losers.task = task;
        losers.taskId = task.getTaskId();
        losers.value = failedIndicators;
        sendValue(losers);
    }

    /**
     * send a disconnected server to the resule queue
     */
    private void sendDisconnectedIndicators() {
        IndicatorValue losers = new IndicatorValue();
        losers.entityId = IndicatorValue.FLOW_CONTROL;
        losers.type = IndicatorValue.TYPE.DisconnMsg;
        losers.queryTimeMillis = System.currentTimeMillis();
        losers.sequenceId = sequenceId;
        // losers.task = task;
        losers.taskId = task.getTaskId();
        losers.value = disconnectedServers;
        sendValue(losers);
    }

    private String cacheKey(String neId, String indicatorName) {
        return neId + "#" + indicatorName;
    }

    public void sendValue(IndicatorValue value) {
        if (IndicatorValue.TYPE.Value.equals(value.type)) {
            Indicator ind = globalDepo.getIndicatorNamespace().getIndicator(value.indicatorName);
            if (StringUtils.isNoneBlank(ind.incremental)) {
                String incrementalCacheKey = cacheKey(value.entityId, value.indicatorName);
                logger.debug("Incremental ind:{}-{}", value.entityId, value.indicatorName);
                IncrementalUtil.incrementalValue(ind.incremental, incrementalCacheKey, value);
            }
        }
        sendValueToMainFlux(value);
        instanceValueQueue.onNext(value);
    }

    public NSIndicatorDepo getDepo() {
        return globalDepo;
    }

    public EntityInfo getEntity(String entityId) {
        return globalDepo.getDomain().getEntity(entityId);
    }

    public Indicator getIndicator(String indicatorName) {
        return globalDepo.getIndicatorNamespace().getIndicator(indicatorName);
    }

    public INDICATOR_QUERY_STATUS getIndicatorQueryStatus(String neId, String indicatorName) {
        if (getNEIndicatorDepo(neId).hasIndicatorValue(indicatorName)) {
            return INDICATOR_QUERY_STATUS.READY;
        }
        Set<String> inds = failedIndicators.get(neId);

        if (inds == null) {
            return INDICATOR_QUERY_STATUS.WAITING;
        }

        return inds.contains(indicatorName) ? INDICATOR_QUERY_STATUS.FAILED : INDICATOR_QUERY_STATUS.WAITING;
    }

    public boolean isIndicatorFailed(String entityId, String indicatorName) {
        return INDICATOR_QUERY_STATUS.FAILED == getIndicatorQueryStatus(entityId, indicatorName);
    }
    //////////////////// static members ////////////////////////////////

//    private static TopicProcessor<IndicatorValue> mainValueProcessor = ProcessUtil.createTopicProcessor("main-value", 32768, false);
    private static QueueProcessor<IndicatorValue> mainValueProcessor = QueueProcessor.of("main-value", 512);//ProcessUtil.createTopicProcessor("main-value", 32768, false);

//    /**
//     * 重新定义 mainValueProcessor  bufferSize  防止内存溢出
//     * @param bufferSize
//     */
//    public static void reDefineBufferSize(int bufferSize) {
//        synchronized (mainValueProcessor) {
//            mainValueProcessor = ProcessUtil.createTopicProcessor("main-value", bufferSize, false);
//        }
//    }

    public static void subScribeAllValue(Consumer<? super IndicatorValue> subscriber) {
        mainValueProcessor.subscribe(subscriber);
    }

    /*public static void subScribeAllValue(Subscriber<IndicatorValue> subscriber) {
        synchronized (mainValueProcessor) {
            mainValueProcessor.subscribe(subscriber);
        }
    }*/

    /*public static FluxProcessor<IndicatorValue, IndicatorValue> getMainValueFlux() {
        return mainValueProcessor;
    }*/

    private static AtomicLong valueSentCount = new AtomicLong(0);

    public static long getValueSentCount() {
        return valueSentCount.get();
    }

    private static void sendValueToMainFlux(IndicatorValue v) {
        synchronized (mainValueProcessor) {
            mainValueProcessor.next(v);
            valueSentCount.incrementAndGet();
        }
    }

}
